Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SNOW-1458135 Implement DataFrame and Series initialization with lazy Index objects #2137

Open
wants to merge 60 commits into
base: main
Choose a base branch
from

Conversation

sfc-gh-vbudati
Copy link
Contributor

  1. Which Jira issue is this PR addressing? Make sure that there is an accompanying issue to your PR.

    Fixes SNOW-1458135

  2. Fill out the following pre-review checklist:

    • I am adding a new automated test(s) to verify correctness of my new code
      • If this test skips Local Testing mode, I'm requesting review from @snowflakedb/local-testing
    • I am adding new logging messages
    • I am adding a new telemetry message
    • I am adding new credentials
    • I am adding a new dependency
    • If this is a new feature/behavior, I'm adding the Local Testing parity changes.
  3. Please describe how your code solves the related issue.

  • Implemented functionality to enable creating Series and DataFrame objects with a lazy Index object as the data, index, and/or columns.
  • This also covers creating Series and DataFrames with rows/columns that don't exist in the given data.
  • A special case is when the data is a Series or DataFrame object, the new Series or DataFrame object is creating by filtering the data with provided index and columns.
  • In case some values in index don't exist in data's index, these values are added as new rows and their corresponding data values are NaN.
  • In case some values in columns don't exist in data's columns, these values are added as new NaN columns.
  • I use a right outer join to add the new index values, and create and append the new NaN columns in the logic.

…y-index

# Conflicts:
#	src/snowflake/snowpark/modin/plugin/compiler/snowflake_query_compiler.py
@sfc-gh-vbudati sfc-gh-vbudati added the NO-PANDAS-CHANGEDOC-UPDATES This PR does not update Snowpark pandas docs label Aug 21, 2024
@sfc-gh-vbudati
Copy link
Contributor Author

All of the join counts in the tests have increased because during DataFrame/Series creation with a non-Snowpark pandas object as data and a Snowpark pandas Index as index, a join is performed instead of converting the index to pandas (which results in an extra query).

In some cases the join count is a lot higher in tests but this is because of the way they are written - some tests call to_pandas() multiple times which results in this.

Copy link
Collaborator

@sfc-gh-azhan sfc-gh-azhan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for doing this! It's a lot of work btw.

Please also check

  1. if you identify some test code can be improved, please add todo and track with jira.
  2. please run a jenkins job to see if anything wrong there before merge.

tests/integ/modin/test_concat.py Outdated Show resolved Hide resolved
Copy link
Collaborator

@sfc-gh-azhan sfc-gh-azhan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dataframe and series constructor are quite similar, can we reuse the code?

@@ -1073,6 +1073,11 @@ class TestSeriesGroupBy:
@pytest.mark.parametrize("by", ["string_col_1", ["index", "string_col_1"], "index"])
def test_dataframe_groupby_getitem(self, by, func, dropna, group_keys, sort):
"""Test apply() on a SeriesGroupBy that we get by DataFrameGroupBy.__getitem__"""
qc = (
6
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we keep using QUERY_COUNT_WITH_TRANSFORM_CHECK and explain why the value change to 5?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done! The query count went down because we're no longer converting the index to pandas in the df constructor

eval_snowpark_pandas_result(
*create_test_series({"a": range(len(datecol))}, index=datecol),
*create_test_series({"2024-01-01": list(range(len(datecol)))}, index=datecol),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I forgot to check this. I'll do it today.

tests/integ/modin/series/test_iloc.py Show resolved Hide resolved
date_index = native_pd.date_range("1/1/2010", periods=6, freq="D")
native_series = native_pd.Series(
{"prices": [100, 101, np.nan, 100, 89, 88]}, index=date_index
{"1/1/2023": [100, 101, np.nan, 100, 89, 88]}, index=date_index
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this is still not able to be fixed?

src/snowflake/snowpark/modin/pandas/dataframe.py Outdated Show resolved Hide resolved

else:
# CASE 5: Non-Snowpark pandas data
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not just call from_pandas for all cases?
What is case 5.A?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe the only special case is
Special case: data is a dictionary where all the values are Snowpark pandas Series

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I refactored it all - the two special cases are dict or list with all snowpark pandas elements

src/snowflake/snowpark/modin/pandas/dataframe.py Outdated Show resolved Hide resolved
src/snowflake/snowpark/modin/pandas/dataframe.py Outdated Show resolved Hide resolved
src/snowflake/snowpark/modin/pandas/dataframe.py Outdated Show resolved Hide resolved
…y-index

# Conflicts:
#	src/snowflake/snowpark/modin/pandas/dataframe.py
#	src/snowflake/snowpark/modin/plugin/extensions/index.py
@@ -1995,3 +1996,68 @@ def create_frame_with_data_columns(
def rindex(lst: list, value: int) -> int:
"""Find the last index in the list of item value."""
return len(lst) - lst[::-1].index(value) - 1


def convert_index_to_qc(index: Any) -> Any:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure how to make the return type "SnowflakeQueryCompiler" without causing circular import issues

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

using "SnowflakeQueryCompiler" with quotes

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That does not work either - it still causes the issues

if isinstance(index, DataFrame): # pandas raises the same error
raise ValueError("Index data must be 1-dimensional")

if dtype == "category":
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where does this check come from? i don't see it was checked anywhere before

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added this check because it was not checked before - we do not support Categorical type yet. If the user passes in dtype=category, this makes the dtype of the Series/DataFrame category.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The dtype seems only used when data is local, typically under this case, we should already apply the dtype check before uploading the data, we shouldn't need to do such check here. Is not not erroring out today?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, it's not erroring out over here today - it's because the data itself is not categorical but should be treated like categorical if dtype is category.

@@ -1995,3 +1996,68 @@ def create_frame_with_data_columns(
def rindex(lst: list, value: int) -> int:
"""Find the last index in the list of item value."""
return len(lst) - lst[::-1].index(value) - 1


def convert_index_to_qc(index: Any) -> Any:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

using "SnowflakeQueryCompiler" with quotes

# -----------------------------
if query_compiler is not None:
# CASE I: query_compiler
# If a query_compiler is passed in only use the query_compiler field to create a new DataFrame.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in our doc, we actually doc in a way that both can be provided and used like following

Notes

DataFrame can be created either from passed data or query_compiler. If both parameters are provided, data source will be prioritized in the next order:

Modin DataFrame or Series passed with data parameter.

Query compiler from the query_compiler parameter.

Various pandas/NumPy/Python data structures passed with data parameter.

please make sure the doc and the behavior is consistent

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated the docs!

@@ -459,104 +464,222 @@ def __init__(
# TODO: SNOW-1063346: Modin upgrade - modin.pandas.DataFrame functions
# Siblings are other dataframes that share the same query compiler. We
# use this list to update inplace when there is a shallow copy.
from snowflake.snowpark.modin.pandas.utils import try_convert_index_to_native
from snowflake.snowpark.modin.plugin.extensions.index import Index
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can the import be at the beginning of file?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done!


elif isinstance(data, Series):
# CASE III: data is a Snowpark pandas Series
query_compiler = data._query_compiler.copy()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are we making a copy of the query compiler here? the query compiler is in general immutable

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed the copy logic

axis=1, labels=try_convert_index_to_native(columns)
)
# Reduce the dictionary to only the relevant columns as the keys.
data = {key: value for key, value in data.items() if key in columns}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that function is kind of becoming too long, can you see if you can break this down into different functions like from_query_compiler, from_local_data etc

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I moved the special cases to their own functions instead of the local data case

@@ -17,7 +17,7 @@
)


@sql_count_checker(query_count=2, join_count=1)
@sql_count_checker(query_count=1, join_count=2)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should keep couple of tests with native index objects

@@ -25,56 +25,56 @@

@pytest.fixture(scope="function")
def df1():
return pd.DataFrame(
return native_pd.DataFrame(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this change seems little bit wired to me, the original code returns snowpark dataframe, but now we return native pandas dataframe here? why is that?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed this because with a Snowpark pandas DataFrame being returned the query count is higher and to_pandas() is called a lot. I think it's better to just convert the object to Snowpark pandas whenever needed

udtf_count=UDTF_COUNT,
join_count=JOIN_COUNT,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you update the JOIN_COUNT, instead of hard code the join cont here?

Copy link
Contributor Author

@sfc-gh-vbudati sfc-gh-vbudati Sep 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In most cases the join count is still 1, I modified it to use JOIN_COUNT+1 instead now

# CASE I: query_compiler
# If a query_compiler is passed in, only use the query_compiler and name fields to create a new Series.
assert (
data is None
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are those check very similar for series and dataframe, can we unify those?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done!

Copy link
Collaborator

@sfc-gh-azhan sfc-gh-azhan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's first align with DataFrame init.

@@ -78,7 +78,7 @@ class Series(BasePandasDataset):
c 3
dtype: int64

The keys of the dictionary match with the Index values, hence the Index
The keys of the dictionary match with the Index values, hence the dictionary
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should keep the original version. Why change this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because this description is wrong - belong only the "index" values are used ('x', 'y', 'z') and the dict values ('a', 'b', 'c') are ignored.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This statement is talking about the example above which should be accurate.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed!

error_checking_for_init(index, dtype)

# The logic followed here is:
# 1. Create a query_compiler from the provided data. If columns are provided, add/select the columns.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's confusing with the numbers here and the number for the cases. Maybe emphasize this is Step 1, Step 2, etc.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I renamed the steps, hopefully this is easier to read

if data.name is None:
# If no name is provided, the default name is 0.
query_compiler = query_compiler.set_columns(columns or [0])
if columns is not None and data.name not in columns:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this be a elif?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe do this

if columns is None:
    # handle all cases here
elif data.name in columns:
    # handle the case here
else:
    # handle data.name not in columns

You didn't handle this case here:
pd.DataFrame(pd.Series([1,2,3], name="b"), columns=["a", "b"])

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I refactored it, it should be handled now and I added a test for it


elif isinstance(data, Series):
# CASE III: data is a Snowpark pandas Series
query_compiler = data._query_compiler.copy()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't need to copy qc right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed the copy logic

extra_columns = [col for col in columns if col not in data.columns]
else:
extra_columns = []
query_compiler = data._query_compiler.create_qc_with_extra_columns(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just use loc like df[extra_columns] = None.

if len(data) and all(
isinstance(v, (Index, BasePandasDataset)) for v in data
):
# Special case V.c: data is a list/dict where all the values are Snowpark pandas objects.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cover them in a helper function

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved special cases to helper functions

# The logic followed here is:
# 1. Create a query_compiler from the provided data. If columns are provided, add/select the columns.
# 2. If an index is provided, set the index through set_index or reindex.
# 3. If the data is a DataFrame, perform loc to select the required index and columns from the DataFrame.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you should move all columns operation into Step 1. It is confusing to select columns here again.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done!

columns=try_convert_index_to_native(columns),
dtype=dtype,
copy=copy,
# 3. If data is a DataFrame, filter result
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you should move all columns operation into Step 1. It is confusing to select columns here again.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done!

distributed_frame = from_non_pandas(data, index, columns, dtype)
if distributed_frame is not None:
self._query_compiler = distributed_frame._query_compiler
new_name = data.name
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you missing these cases?
pd.DataFrame(pd.Index([1,2,3], name = 'b'), columns = ['a']).
pd.DataFrame(pd.Index([1,2,3], name = 'b'), columns = ['a', 'b']).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should be taken care of now


# The logic followed here is:
# STEP 1: Create a query_compiler from the provided data. If columns are provided, add/select the columns.
# STEP 2: If an index is provided, set the index through set_index or reindex.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This does not match your implementation. Some index is handled in Step 1 now.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's implement it following these steps:

  1. handle data
  2. handle columns
  3. handle index

In Step1, we make sure update query_compiler for all lazy data. if query_compiler is None, that means the data is local, and make sure convert data to the right native dataframe in this step.

-- here we can test all cases of pd.DataFrame(data=any, columns=None, index=None)

In Step 2, handle columns based on whether query_compiler is None or not

-- here we can test all cases of pd.DataFrame(data=any, columns=not none, index=None)
In Step 3. similarly handle it based on whether query_compiler is None or not

-- here we can test all cases of pd.DataFrame(data=any, columns=any, index=any)

pytest.param(
"series",
marks=pytest.mark.xfail(
reason="SNOW-1675191 reindex does not work with tuple series"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Collaborator

@sfc-gh-yzou sfc-gh-yzou left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sfc-gh-vbudati @sfc-gh-azhan mentioned that the main purpose of this pr is to remove a to_pandas materialization, can we just do that in this pr, and move the other refactoring part out of the current pr?

name = data.name
from snowflake.snowpark.modin.plugin.extensions.index import Index

# Setting the query compiler
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One more general comment here about the change, our orignial code behaves in such way that if both data and query compiler are provided, the data is used.
However, here seems we want to change it to a way that only one of them can be configured. i think that is fine, however, please make sure we update the doc to clear this part.

Here is couple of points:

  1. from the structure point of view, i think we can do parameter check first, for example, where both query_compiler and parameter is provided. Then check init the query_compiler like the original code structure, unless there are case works very differently.
  2. the check message doesn't seem very clear. for example, query_compiler and index can not be provided together, might be better to "index is not supported when query_compiler is provided" etc.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can make the error messages clearer like you pointed out in (2.) --> "index is not supported when query_compiler is provided". But the parameters are right now checked before they are used. I don't think there are any cases in the code where both query compiler and data/index/columns are provided (no tests have failed so far with anything related to this). I think it's also simpler behavior to have it this way.
The doc should also be updated with this behavior.

if isinstance(index, DataFrame): # pandas raises the same error
raise ValueError("Index data must be 1-dimensional")

if dtype == "category":
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The dtype seems only used when data is local, typically under this case, we should already apply the dtype check before uploading the data, we shouldn't need to do such check here. Is not not erroring out today?

if hasattr(data, "name") and data.name is not None:
# If data is an object that has a name field, use that as the name of the new Series.
name = data.name
# If any of the values are Snowpark pandas objects, convert them to native pandas objects.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Under this case, shouldn't we try to convert other ones to snowpark pandas objects instead of pulling them to local? or maybe we should just error it out.

Do you have one example about this case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One example where its better to convert it to pandas is this:

data = {"A": pd.Series([1, 2, 3]), "B": pd.Index([4, 5, 6]), "C": 5}
pd.DataFrame(data)
Out[58]: 
   A  B  C
0  1  4  5
1  2  5  5
2  3  6  5

5 is put in every single row even though it's a scalar in the dict

@sfc-gh-vbudati
Copy link
Contributor Author

@sfc-gh-yzou I prefer not making the refactor changes in a new PR since I think this one is very close to merging and it will take a lot more work to separate the index changes from this

columns = ensure_index(columns)

# The logic followed here is:
# STEP 1: Obtain the query_compiler from the provided data if the data is lazy. If data is local, the query
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# STEP 1: Obtain the query_compiler from the provided data if the data is lazy. If data is local, the query
# STEP 1: Obtain the query_compiler from the provided data if the data is lazy. If data is local, keep the query

# STEP 1: Obtain the query_compiler from the provided data if the data is lazy. If data is local, the query
# compiler is None.
# STEP 2: If columns are provided, set the columns if data is lazy.
# STEP 3: If both the data and index are local (or index is None), create a query compiler from pandas.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If data is local, create a query compiler from it with local index.

# compiler is None.
# STEP 2: If columns are provided, set the columns if data is lazy.
# STEP 3: If both the data and index are local (or index is None), create a query compiler from pandas.
# STEP 4: Otherwise, set the index through set_index or reindex.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For lazy index, set the index through set_index or reindex.

@sfc-gh-azhan
Copy link
Collaborator

@sfc-gh-yzou I prefer not making the refactor changes in a new PR since I think this one is very close to merging and it will take a lot more work to separate the index changes from this

I kind agree with @sfc-gh-yun this PR is becoming too big. Can we use this as the PoC draft PR, and we can review smaller PRs one by one. You can either start with refactoring pieces first or fix the lazy index first. Try to make sure refactoring PR only do refactoring and no test changes.

@sfc-gh-vbudati
Copy link
Contributor Author

@sfc-gh-azhan @sfc-gh-yzou I can try to separate this PR into two other PRs - one for the lazy index change and the other for the refactor. It is impossible to avoid test changes in the refactor PR since I introduced functionality to allow passing non-existent columns or index values to the constructor. The constructors should be able to handle any kind of inputs and I added tests for this.

However, that requires me to make a non-trivial amount of redundant code changes, for example, the same set of tests are changed in both PRs where the query count will likely be different due to the refactor. I was hoping to work on IR tickets from Monday, so I still prefer merging this PR as is, please let me know if you both feel strongly about this.

In the future, I'd really appreciate if the feedback about splitting PRs is brought up earlier.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
NO-PANDAS-CHANGEDOC-UPDATES This PR does not update Snowpark pandas docs snowpark-pandas
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants